今天我們將主要實現出Transformer的完整的Encoder與Decoder架構,而這次的程式碼可說是我們在這30天內接觸的最複雜程序,因為他不僅需要非常清楚了解Transformer的理論,還要有矩陣操作的能力,因此我會盡可能詳細解釋程式碼中的每一部分,來協助你理解每段程式的對應理論。
Transformer
所需要做的資料前處理Mask
的矩陣創立方式與使用Encoder
與Decoder
建立方式這次我們將使用Kondalarao Vonteru的數據集的擴展包進行文本摘要的工作,這個資料集含有約9.8萬條由專業作家所撰寫的新聞及文本摘要,而我們的目的即是利用此資料集來訓練Transformer模型,使我們能夠快速理解文章中的重點,接下來讓我們透過以下的程式碼來建構這個模型:
這次我們將資料儲存於CSV檔案中,將其劃分為Train
與Valid
兩個資料夾,每一個資料夾中,我們都存放了三個CSV檔案,為了讀取這些資料,我們需要透過迴圈操作來執行。在這些CSV檔案中,資料分成了summary(摘要)
與text(原始資料)
兩個部分,因此我們需要將這兩個欄位分開處理,其中text
將會被用作Encoder的輸入,而summary
則會被用作Decoder的輸入。
import pandas as pd
import os
def load_data(path):
x_train, y_train, x_valid, y_valid= [], [], [], []
for types in os.listdir(path):
classes_path = f'{path}/{types}'
for classes in os.listdir(classes_path):
file_path = f'{classes_path}/{classes}'
df = pd.read_csv(file_path).values
input_text, summary = df[:,1], df[:,0] # summary 欄位0 text欄位1
if types == 'Train':
x_train.extend(input_text)
y_train.extend(summary)
else:
x_valid.extend(input_text)
y_valid.extend(summary)
return x_train, y_train, x_valid, y_valid
x_train, y_train, x_valid, y_valid = load_data('SummaryData')
在這次的程式處理過程中,我們無需手動將數據分割成訓練集和驗證集,就像上述程式所展示的,我們可以簡單地通過資料夾名稱迅速切割CSV文件的內容,該程式主要是利用listdir()
方法來取得所有資料夾或文件的名稱,然後在最底層的文件夾中使用read_csv()
來讀取資料。
這一步我相信大家都很熟悉,我們首先透過get_tokenizer()
來進行英文的斷詞工作,然後用vocab
統計這些詞彙已建立起詞彙表,而在這種Encoder-Decoder架構中,我們還需要加入特殊的標籤<SOS>
、<EOS>
,使讓模型能學會這部分的特性,這和我們先前【Day 11】掌握文字翻譯的技術(下)-英法語言翻譯模型使用的技術相同,不過在這裡我並未先用pad_sequence()
來填充這些詞彙,因為這次的資料詞彙量非常大,高達8000個以上如果一次全部填充,那麼會大大增加模型的運算時間。
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import vocab
from collections import Counter
def get_vocab(inputs, tokenizer, train_len, special = ('<PAD>', '<SOS>','<EOS>','<UNK>')):
counter = Counter()
new_inputs = []
for sentence in inputs:
tokens = tokenizer(sentence)
counter.update(tokens)
new_inputs.append(tokens)
token_vocab = vocab(counter, min_freq=5, specials=special)
return token_vocab, new_inputs[:train_len], new_inputs[train_len:]
all_input = x_train + x_valid
all_target = y_train + y_valid
tokenizer = get_tokenizer('basic_english')
input_vocab, x_train, x_valid= get_vocab(all_input, tokenizer, len(x_train))
traget_vocab, y_train, y_valid= get_vocab(all_target, tokenizer, len(y_train))
input_vocab.set_default_index(input_vocab.get_stoi()['<UNK>'])
traget_vocab.set_default_index(traget_vocab.get_stoi()['<UNK>'])
# Ecoder與Decoder的Embedding輸入大小
INPUT_DIM = len(input_vocab)
OUTPUT_DIN = len(traget_vocab)
# 取得給予模型的索引值
SOS_IDX = input_vocab.get_stoi()['<SOS>']
EOS_IDX = input_vocab.get_stoi()['<EOS>']
PAD_IDX = input_vocab.get_stoi()['<PAD>']
為了讓電腦能理解文字,我們先把詞彙轉換成數字。這項轉換過程我們可以透過lookup_indices()
來完成,但在此步驟中我們還需於每一個句子對的最末句中加入<EOS>
這種特殊符號。並且為了節省計算資源,我們還讓單個句子的詞彙數量上限為5000個(如果電腦處理能力不足,可將此數量縮減),若超過此數量的部分,將會直接被切除。
import torch
def token2num(inputs, targets):
encoder_input, decoder_input = [], []
for i in range(len(inputs)):
encoder_in = input_vocab.lookup_indices(inputs[i])[:4999] + [EOS_IDX]
decoder_in = traget_vocab.lookup_indices(targets[i])[:4999] + [EOS_IDX]
encoder_input.append(torch.tensor(encoder_in))
decoder_input.append(torch.tensor(decoder_in))
return encoder_input, decoder_input
x_train, y_train= token2num(x_train, y_train)
x_valid, y_valid= token2num(x_valid, y_valid)
當我們建立好訓練資料與驗證資料後,我們先使用Dataset()
來封裝這些資料。
from torch.utils.data import Dataset, DataLoader
class SummaryeDataset(Dataset):
def __init__(self, x, y):
self.x = x
self.y = y
def __getitem__(self, index):
return self.x[index], self.y[index]
def __len__(self):
return len(self.x)
trainset = SummaryeDataset(x_train, y_train)
validset = SummaryeDataset(x_valid, y_valid)
接下來我們將進行一些特別的處理,在這次的資料前處理中步驟中,因我們並未使用pad_sequence()
,所以我們必須在模型訓練時進行該步驟,由於我們採用的是Encoder-Decoder架構,所以Encoder的輸入大小必須與Decoder的輸入大小相同。因此我們需要先將資料組合起來再使用pad_sequence()
,接著通過split()
將Encoder和Decoder的輸入資料分開。
from torch.nn.utils.rnn import pad_sequence
def collate_fn(batch):
(x, y) = zip(*batch)
pad_data = pad_sequence(x + y, padding_value=PAD_IDX, batch_first=True)
src, tgt = torch.split(pad_data, split_size_or_sections=[len(x), len(y)], dim=0)
return src.permute(1, 0) , tgt.permute(1, 0)
train_loader = DataLoader(trainset, batch_size = 2, shuffle = True, num_workers = 0, pin_memory = True, collate_fn = collate_fn)
valid_loader = DataLoader(validset, batch_size = 2, shuffle = True, num_workers = 0, pin_memory = True, collate_fn = collate_fn)
在這裡我們需要注意幾個細節,當我們填充完資料後,return
時使用了permute(1, 0)
這個動作,這是因為我們的原始輸入維度是(batch_size, seq_len)
,而在Pytorch裡,時序相關的參數大多需要在該模型中設置batch_first=True
才能用這種輸入維度,但這個參數的預設值通常是False
,因此我選擇直接將輸入維度轉變為(seq_len, batch_size)
,這樣在建立複雜的模型時,我們就可以避免過度使用batch_first=True
參數。
建立Positional Encoding的部分主要是實踐該公式的方式,不過我們在這裡仍選擇將整個程式分成多段來講解,以防你無法理解程式的內容。
class PositionalEncoding(nn.Module):
def __init__(self, emb_size, dropout, maxlen = 5000):
super(PositionalEncoding, self).__init__()
首先我們需要理解在Positional Encoding中,一個重要的dmodel
參數,這個參數決定了我們在Encoder及Decoder中給予Positional Encoding的維度大小,因此我們需要傳寫一個emb_size
來獲取該參數,並且在Positional Encoding中我們通常會設置dropout
和maxlen
兩個參數。
dropout
的設置主要是為了防止模型過度擬合。至於maxlen
它的設置源於一個實際問題,由於我們的電腦通常無法負擔過大的計算量,因此當我們無法將輸入的大小合理調整時,我們就需要將它直接截斷,並且該大小的上限必須大於等於我們在【STEP 3】時所設置的長度設定。
den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
pos = torch.arange(0, maxlen).reshape(maxlen, 1)
pos_embedding = torch.zeros((maxlen, emb_size))
pos_embedding[:, 0::2] = torch.sin(pos * den)
pos_embedding[:, 1::2] = torch.cos(pos * den)
pos_embedding = pos_embedding.unsqueeze(-2)
self.dropout = nn.Dropout(dropout)
self.register_buffer('pos_embedding', pos_embedding)
在這個程式的內部,首先是將所有的輸入通過公式計算成固定位置的數值,該公式也就是我們昨天所提到的轉換公式,接著我們需要計算出包含位置信息的張量pos
,該變數的目的是通過sin()
與cos()
方法來計算張量在奇數和偶數列中的位置信息,此外我們還需要擴展整體維度的向量來符合後續Transformer的運算需求。
並且在這裡,我們還使用了一個特別的技巧,即self.register_buffer
,它的功能是使定義的參數不能被更新,這是因為在Positional Encoding中,位置資訊是不能被更動的。
def forward(self, token_embedding: Tensor):
return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])
在該模型的前向傳播過程中,操作相當簡單了,我們只需將輸入的embedding向量與對應的位置訊息進行結合,當我們完成這個步驟後,該模型的詞嵌入向量就已經附加了位置訊息。
我們昨天在Transformer中,由於需建立Encoder和Decoder的詞嵌入層,因此我們將其規劃為一個獨立的類別,在這裡,可以看到一個特別的操作是math.sqrt(self.emb_size)
,這個操作主要用來調整嵌入向量的尺度,與q
、k
向量的縮放作法相似。
class TokenEmbedding(nn.Module):
def __init__(self, vocab_size: int, emb_size):
super(TokenEmbedding, self).__init__()
self.embedding = nn.Embedding(vocab_size, emb_size)
self.emb_size = emb_size
def forward(self, tokens: Tensor):
return self.embedding(tokens.long()) * math.sqrt(self.emb_size)
首先我們來介紹傳入該模型的超參數,在Transformer中,我們不僅可以控制Multi-head attention中head的數量,還能控制其Encoder和Decoder的層數,越多層的Transformer計算會更抽象,因此需要大量的實驗才會知道結果,至於其餘的參數,看到這邊的你應該已經有相當瞭解了所以不再多做解釋了。
class Seq2SeqTransformer(nn.Module):
def __init__(self, num_encoder_layers, # Encoder數量
num_decoder_layers, # Decoder數量
emb_size, # Embedding輸出
nhead, # head的數量
src_vocab_size, # Encoder Embedding大小
tgt_vocab_size, # Deocder Embedding大小
dim_feedforward = 512, # feedforward神經元數量
dropout = 0.1, # 每層丟棄多少神經元
):
super(Seq2SeqTransformer, self).__init__()
在一個Pytorch的Transformer類別中,我們需要定義出emb_size
、nhead
和dim_feedforward
這幾個參數,在原始的論文中,作者設定的nhead
數量是8,dim_feedforward
數量是2048,在這裡一樣是經過實驗才會知道他的效果,若沒有想法時直接使用預設值在後在使用窮舉法測試就是一個很好的實驗方式。
我們還需要要注意的是num_encoder_layers
與num_decoder_layers
這兩個參數,它們分別代表Encoder和Decoder的模型架構數量,在Transformer類別中主要有兩種宣告方式,一種是你可以自行建立這些模型後,將它們放入到Transformer中,而另一種就是直接給予數字,那麼會直接按照預設來幫你建立Encoder和Decoder。
self.transformer = Transformer(d_model=emb_size,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
num_decoder_layers=num_decoder_layers,
dim_feedforward=dim_feedforward,
dropout=dropout)
其餘的層數就很好理解了,主要包含詞嵌入層以及Positional Encoding,其中generator
則是指在Decoder輸出時的全連接層。
self.generator = nn.Linear(emb_size, tgt_vocab_size)
self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
self.positional_encoding = PositionalEncoding(
emb_size, dropout=dropout)
前向傳播的處理方式相對較為複雜,因為我們需要考慮到Decoder中的生成方法來處理,為此我們需要運用到兩種MASK
,分別是mask
和padding_mask
。
其中padding_mask可以理解為忽略PAD_IDX的索引,而src_mask
、tgt_mask
的建立就變得稍微複雜些,因為我們需要創建一個能夠遮蔽輸入的矩陣,在通常情況下src_mask
不需要遮蔽任何值,而tgt_mask
則需要建立一個與Encoder相對應的矩陣,關於這種建立方式,我會在後面進一步解釋。
def forward(self,
src, # Encoder輸入
trg, # Decoder輸入
src_mask, # Encoder輸入忽略的訊息
tgt_mask, # Decoder輸入忽略的訊息
src_padding_mask, # Encoder輸入忽略PAD_IDX的索引
tgt_padding_mask, # Decoder輸入忽略PAD_IDX的索引
memory_key_padding_mask):
src_emb = self.positional_encoding(self.src_tok_emb(src))
tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
return self.generator(outs)
首先,在建立tgt_mask
的過程中,我們只需要了解輸入陣列的長度,昨天我們提到,針對Decoder的第i
個輸出,我們需要對i+1
及其之後的文字位置進行Mask的操作(圖片中左上座標為0, 0)
這種語句在矩陣上的實現方式,就是把該矩陣的下三角部分全都改為0(代表不遮蔽)
,而對於該矩陣的解讀我們要輸出第2
個文字(X軸為2)時需要使用3
個Mask遮罩(Y軸為3),以此類推就能夠完成上述矩陣的建立。
而在程式中建立該舉證的最快方式就是建立一個全為1
的矩陣,接下來直接通過triu()
的方式將下三角改為0
,如此一來就能滿足我們的輸入需求。
def generate_square_subsequent_mask(sz):
mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
不過,在Pytorch的對於浮點運算時,float('-inf')
才代表被保留,而其餘的則保持不變。因此我們需要把0
的部分修改為float('-inf')
,1
的部分修改為0
。
至於剩下的Mask建立方式就很簡單了,因為我們的Encoder不需被遮被,所以只需要建立一個全都是0
的矩陣即可,而padding_mask
就只需要找到PAD_IDX就能夠處理了。
def create_mask(src, tgt):
src_seq_len = src.shape[0]
tgt_seq_len = tgt.shape[0]
tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
src_mask = torch.zeros((src_seq_len, src_seq_len),device=device).type(torch.bool)
src_padding_mask = (src == PAD_IDX).transpose(0, 1)
tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
在訓練方式上,我們依然使用原本的方法,但在訓練時我們需要對Decoder的tgt
進行處理。這是因為tgt_input
提供了先前已知的目標序列,相較之下tgt_out
提供了模型所預期的下一個詞彙,所以兩者在時間序列中會有一個時間差,因此模型會在根據tgt_input
進行預測後,需要轉換序列才能對同樣序列的tgt_out
進行損失計算。
def train(epoch):
train_loss = 0
train_pbar = tqdm(train_loader, position=0, leave=True)
model.train()
for input_datas in train_pbar:
src, tgt = [i.to(device) for i in input_datas]
tgt_input = tgt[:-1, :]
src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)
optimizer.zero_grad()
tgt_out = tgt[1:, :]
loss = criterion(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
loss.backward()
optimizer.step()
train_pbar.set_description(f'Train Epoch {epoch}')
train_pbar.set_postfix({'loss':f'{loss:.3f}'})
train_loss += loss.item()
return train_loss/len(train_loader)
本次的訓練方式與我們在【Day 11】掌握文字翻譯的技術(下)-英法語言翻譯模型】)所介紹的完全相同,不過需要注意的是,本次的訓練量特別大,故訓練所需的時間可能較長,若你的電腦硬體負荷不起,可以考慮減少文本中的字數或是降低模型的層數來進行訓練,以下是訓練的程式碼:
epochs = 100 # 訓練次數
early_stopping = 10 # 模型訓練幾次沒進步就停止
stop_cnt = 0 # 計數模型是否有進步的計數器
model_path = 'model.ckpt' # 模型存放路徑
show_loss = False # 是否顯示訓練折線圖
best_loss = float('inf') # 最佳的Loss
loss_record = {'train':[], 'valid':[]} # 訓練紀錄
for epoch in range(epochs):
train_loss = train(epoch)
valid_loss = valid(epoch)
loss_record['train'].append(train_loss)
loss_record['valid'].append(valid_loss)
# 儲存最佳的模型權重
if valid_loss < best_loss:
best_loss = valid_loss
torch.save(model.state_dict(), 'e' + model_path)
print(f'Saving Model With Loss {best_loss:.5f}')
stop_cnt = 0
else:
stop_cnt+=1
# Early stopping
if stop_cnt == early_stopping:
output = "Model can't improve, stop training"
print('-' * (len(output)+2))
print(f'|{output}|')
print('-' * (len(output)+2))
break
print(f'Train Loss: {train_loss:.5f}' , end='| ')
print(f'Valid Loss: {valid_loss:.5f}' , end='| ')
print(f'Best Loss: {best_loss:.5f}', end='\n\n')
if show_loss:
show_training_loss(loss_record)
程式執行完成後,我們即能見到以下的訓練結果,這時我們便能利用該模型來進行貪婪解碼或進行其他更佳的文字生成操作,對於該部分我在此就不再詳細說明,如果你對如何生成感興趣,可以進一步觀看我在GitHub中存放的程式碼。
Train Epoch 67: 100%|██████████| 45869/45869 [21:57<00:00, 34.82it/s, loss=0.121]
Valid Epoch 67: 100%|██████████| 56887/56887 [08:40<00:00, 109.26it/s, loss=0.162]
Train Loss: 0.12940| Valid Loss: 0.14608| Best Loss: 0.14608
你有沒有發現,雖然這次的程式碼與Seq2Seq時十分相似,但所需的處理動作卻更多?這個問題存在的原因是Transformer並沒有時間序列的概念,因此在處理上,需要使用到大量的矩陣進行相乘與計算,而這也是Transformer的一大特點,因為在GPU上執行矩陣運算的速度通常是最快的,所以與時間序列模型相比,我們可以看出,雖然該模型的運算量大幅增加,但它的運算速度卻比Seq2Seq快其效能也更好,而明天我將教你使用Transformer的熱門預訓練模型BERT。
那麼我們明天再見!
內容中的程式碼都能從我的GitHub上取得:
https://github.com/AUSTIN2526/iThome2023-learn-NLP-in-30-days